package com.atguigu.flink.state.operatorstate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
 * Created by Smexy on 2023/11/15

 与ListState类似，唯一区别在于。状态恢复后，对状态的分配。
 ListState是将所有的状态尽量均匀地分配到多个Task，每个Task只获取状态的一部分。
 而UnionListState则是将状态的全部，分配给所有的Task。

 UnionListState目前仅仅在KafkaSource使用。

    举例：
            有消费者组 test1，有两个消费者。 KafkaSource算子有2个Task。消费topicA(3个分区)
             source 2-1 消费 A-0, offset=23
             source 2-1 消费 A-1, offset=22
             source 2-2 消费 A-2, offset=21

            一旦程序故障了，重启程序，从之前备份的offset恢复数据
                test1 消费 A-0, offset=23,消费 A-1, offset=22,消费 A-2, offset=21
            会初始化2个Task，为两个Task分配消费的分区(随机，负载均衡)。

                source 2-1 消费 A-0, offset=23

                source 2-2 消费 A-1, offset=22
                source 2-2 消费 A-2, offset=21
            在以上场景下，如果使用ListState，均匀分配offsets可能导致某个Task找不到要消费的分区的offsets，因此KafkaSource使用的是UnionListState





 */
public class Demo2_UnionListState
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //每2s备份一次状态，当开启了checkpoint时，默认就是无限重启
        env.enableCheckpointing(2000);
        env.setParallelism(2);
                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new MyMapFunction())
                    .addSink(new SinkFunction<String>()
                    {
                        @Override
                        public void invoke(String value, Context context) throws Exception {
                            if (value.contains("x")){
                                //手动模拟故障
                                throw new RuntimeException("出故障了...");
                            }
                            System.out.println(value);
                        }
                    });

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }

    private static class MyMapFunction implements MapFunction<String,String>, CheckpointedFunction
    {

        private ListState<String> result ;

        @Override
        public String map(String value) throws Exception {
            //写状态
            result.add(value);
            //读状态
            return result.get().toString();
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("MyMapFunction.snapshotState");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("MyMapFunction.initializeState");

            OperatorStateStore operatorStateStore = context.getOperatorStateStore();
            ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("1", String.class);
            //UnionListState在使用时大部分和ListState是一样的
           // result = operatorStateStore.getListState(listStateDescriptor);
            result = operatorStateStore.getUnionListState(listStateDescriptor);
        }
    }
}
